package com.atguigu.flink.datastreamapi.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * Created by Smexy on 2023/11/11

    KafkaSource，flink充当消费者的角色。

        Kafka中存储的数据都是byte[] ----> 读数据byte[] ----> ConsumerRecord(byte [] K,byte []V) ----->反序列化
            一般情况下，只获取ConsumerRecord的Value部分。
            一般情况下，ConsumerRecord的Value是 jsonString。



 */
public class Demo5_KafkaSourceWithKey
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        KafkaSource<Tuple2<String, String>> kafkaSource = KafkaSource
            .<Tuple2<String, String>>builder()
            .setBootstrapServers("hadoop102:9092,hadoop103:9092")
            .setTopics("t5")
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            //既需要反序列化value也需要key
            .setDeserializer(new KafkaRecordDeserializationSchema<Tuple2<String, String>>()
            {
                //反序列化
                @Override
                public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Tuple2<String, String>> out) throws IOException {
                    //获取key
                    String key = new String(record.key(), StandardCharsets.UTF_8);
                    //获取value
                    String value = new String(record.value(), StandardCharsets.UTF_8);
                    out.collect(Tuple2.of(key, value));
                }

                //流中输出的数据的类型的类型信息
                @Override
                public TypeInformation<Tuple2<String, String>> getProducedType() {
                    return Types.TUPLE(Types.STRING, Types.STRING);
                }
            })
            .setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test4")
            .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
            .setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
            .build();

        env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kakfa")
            .print();



        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
